This notebook contains scripts that evaluate sensitivity of TCS compared to cluster-based statistic (for different sample sizes, refer to the supplementary analyses of TCS manuscript).
Loading required packages
import os
import numpy as np
import pandas as pd
import nibabel as nib
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.notebook import tqdm
Basic functions
def ensure_dir(file_name):
os.makedirs(os.path.dirname(file_name), exist_ok=True)
return file_name
def write_np(np_obj, file_path):
with open(file_path, 'wb') as outfile:
np.save(outfile, np_obj)
def load_np(file_path):
with open(file_path, 'rb') as infile:
return np.load(infile)
Plot settings (latex is used for better plotting)
sns.set()
sns.set_style("darkgrid")
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
plt.rc('text', usetex=True)
plt.rc('text.latex', preamble=r'\usepackage{mathtools} \usepackage{sfmath}')
plt.rc('xtick', labelsize=20)
plt.rc('ytick', labelsize=20)
plt.rc('axes', labelsize=24)
plt.rc('figure', dpi=500)
The ground truth stored in notebook 2 is loaded here.
# list of all tasks and the cope number related to each selected contrast
tasks = {
'EMOTION': '3', # faces - shapes
'GAMBLING': '6', # reward - punish
'RELATIONAL': '4', # rel - match
'SOCIAL': '6', # tom - random
'WM': '20', # face - avg
}
# Compute mean and std, followed by a parametric z-score (one sample t-test)
ground_truth_effect = {}
# Base directory where files are stored at
base_dir='/data/netapp01/work/sina/structural_clustering/PALM_revision_1'
for task in tqdm(tasks, desc="Tasks loop", leave=True):
ground_truth_effect[task] = load_np(
'{}/ground_truth/cohen_d_{}_cope{}.dscalar.npy'.format(base_dir, task, tasks[task]),
)
Tasks loop: 0%| | 0/5 [00:00<?, ?it/s]
PALM results stored in notebook 1 is loaded here.
%%time
# Number of random repetitions
repetitions = 500
# Different sample sizes tested
sample_sizes = [10, 20, 40, 80, 160, 320]
# Different cluster defining thresholds
cdts = [3.3, 2.8, 2.6, 2.0, 1.6]
# Number of brainordinates in a cifti file
Nv = 91282
# Base directory where files are stored at
base_dir='/data/netapp01/work/sina/structural_clustering/PALM_revision_1'
# Store loaded results in nested python dictionaries
loaded_maps = {}
loaded_maps['uncorrected_tstat'] = {}
loaded_maps['spatial_cluster_corrected_tstat'] = {}
loaded_maps['topological_cluster_corrected_tstat'] = {}
# Only use the z=3.3, p=0.001 for the main analyses reported here
cdt = 3.3
for task in tqdm(tasks, desc="Tasks loop", leave=True):
loaded_maps['uncorrected_tstat'][task] = {}
loaded_maps['spatial_cluster_corrected_tstat'][task] = {}
loaded_maps['topological_cluster_corrected_tstat'][task] = {}
for sample_size in tqdm(sample_sizes, desc="Sample size loop", leave=False):
loaded_maps['uncorrected_tstat'][task][f'N={sample_size}'] = load_np(
f'{base_dir}/summary/uncorrected_tstat_{task}_{sample_size}_samples_{cdt}_CDT.npy',
)
loaded_maps['spatial_cluster_corrected_tstat'][task][f'N={sample_size}'] = load_np(
ensure_dir(f'{base_dir}/summary/spatial_cluster_corrected_tstat_{task}_{sample_size}_samples_{cdt}_CDT.npy'),
)
loaded_maps['topological_cluster_corrected_tstat'][task][f'N={sample_size}'] = load_np(
ensure_dir(f'{base_dir}/summary/topological_cluster_corrected_tstat_{task}_{sample_size}_samples_{cdt}_CDT.npy'),
)
Tasks loop: 0%| | 0/5 [00:00<?, ?it/s]
Sample size loop: 0%| | 0/6 [00:00<?, ?it/s]
Sample size loop: 0%| | 0/6 [00:00<?, ?it/s]
Sample size loop: 0%| | 0/6 [00:00<?, ?it/s]
Sample size loop: 0%| | 0/6 [00:00<?, ?it/s]
Sample size loop: 0%| | 0/6 [00:00<?, ?it/s]
CPU times: user 212 ms, sys: 59 s, total: 59.2 s Wall time: 5min 38s
Script below generates the results of sensitivity analysis reported in the manuscript
import scipy.stats as stats
from scipy.interpolate import CubicSpline
from scipy.interpolate import UnivariateSpline
from statsmodels.stats.power import TTestPower
from matplotlib.patches import Patch
# %config InlineBackend.figure_format = 'svg'
%config InlineBackend.figure_format = 'retina'
plt.rc('figure', dpi=300)
analysis = TTestPower()
fig = plt.figure(figsize=(30, 12),constrained_layout=True)
# fig = plt.figure(figsize=(18, 20))
gs = fig.add_gridspec(3, 5)
# fig.suptitle('Impact of sample size on power', fontsize=36, y=1.06)
# colors = sns.cubehelix_palette(5)
# sample_size = 40
# sample_sizes = [40,]
sample_sizes = [10, 20, 40, 80, 160, 320]
# sample_sizes = [20, 40, 80, 160]
sample_colors = np.array(sns.color_palette("rainbow", len(sample_sizes)))
logp_threshold = -np.log10(0.05)
# for ri, task in enumerate(['EMOTION']):
for ci, task in enumerate(tasks):
for ri, method in enumerate(['spatial', 'topological', 'difference']):
ax = fig.add_subplot(gs[ri, ci])
scatterx = ground_truth_effect[task]
for si, sample_size in enumerate(sample_sizes):
t_stats = loaded_maps['uncorrected_tstat'][task][f'N={sample_size}']
t_stats = t_stats[~np.isnan(t_stats).any(axis=1)]
if method == 'difference':
topological_cluster_logps = loaded_maps['topological_cluster_corrected_tstat'][task][f'N={sample_size}']
topological_cluster_logps = topological_cluster_logps[~np.isnan(topological_cluster_logps).any(axis=1)]
topological_positive_effects = np.multiply(np.mean((topological_cluster_logps>logp_threshold) & (t_stats>0), 0), (ground_truth_effect[task]>0))
topological_negative_effects = np.multiply(np.mean((topological_cluster_logps>logp_threshold) & (t_stats<0), 0), (ground_truth_effect[task]<0))
spatial_cluster_logps = loaded_maps['spatial_cluster_corrected_tstat'][task][f'N={sample_size}']
spatial_cluster_logps = spatial_cluster_logps[~np.isnan(spatial_cluster_logps).any(axis=1)]
spatial_positive_effects = np.multiply(np.mean((spatial_cluster_logps>logp_threshold) & (t_stats>0), 0), (ground_truth_effect[task]>0))
spatial_negative_effects = np.multiply(np.mean((spatial_cluster_logps>logp_threshold) & (t_stats<0), 0), (ground_truth_effect[task]<0))
topological_scattery = (topological_positive_effects + topological_negative_effects)
spatial_scattery = (spatial_positive_effects + spatial_negative_effects)
scattery = topological_scattery - spatial_scattery
xlim = (-1.5,1.5)
# sns.scatterplot(
# x=scatterx,
# y=scattery,
# ax = ax,
# s=10,
# legend=False,
# color=(0.7,0.5,0.1,0.1),
# linewidth=0,
# )
# cubic spline fit
bins = np.linspace(max(xlim[0], scatterx.min()), min(xlim[1], scatterx.max()), 31)
digitized = np.digitize(scatterx, bins)
x_means = [scatterx[(digitized == i) | (digitized == i + 1)].mean() for i in range(1, len(bins) - 1)]
x_centers = bins[1:-1]
y_means = [scattery[(digitized == i) | (digitized == i + 1)].mean() for i in range(1, len(bins) - 1)]
y_sems = [stats.sem(scattery[(digitized == i) | (digitized == i + 1)]) for i in range(1, len(bins) - 1)]
# topological_y_means = [topological_scattery[(digitized == i) | (digitized == i + 1)].mean() for i in range(1, len(bins) - 1)]
# spatial_y_means = [spatial_scattery[(digitized == i) | (digitized == i + 1)].mean() for i in range(1, len(bins) - 1)]
cs = CubicSpline(x_means, y_means, bc_type='natural', extrapolate=False)
cs_sem = CubicSpline(x_means, y_sems, bc_type='natural', extrapolate=False)
# topological_cs = CubicSpline(x_means, topological_y_means, bc_type='natural', extrapolate=False)
# spatial_cs = CubicSpline(x_means, spatial_y_means, bc_type='natural', extrapolate=False)
sample_x = np.linspace(scatterx.min(),scatterx.max(),200)
sample_y = cs(sample_x)
sample_y_sem = cs_sem(sample_x)
# methodcorr = topological_cs(meffects) - spatial_cs(meffects)
sns.lineplot(
x=sample_x,
y=sample_y,
style=True,
dashes=[(1,3)],
# color=(0.01,0.05,0.07,1),
color=np.append(sample_colors[si], 1),
legend=False,
linewidth=2,
# label=f'N={sample_size}',
)
ax.fill_between(
sample_x,
sample_y - (sample_y_sem*1.96),
sample_y + (sample_y_sem*1.96),
# color = [0.1,0.5,0.7,0.3],
color = np.append(sample_colors[si], 0.3),
)
ax.errorbar(
# x_centers,
x_means,
y_means,
# xerr=(x_centers * 0) + ((bins[1] - bins[0])),
# xerr=(x_centers * 0),
xerr=(np.array(x_means) * 0),
yerr=(np.array(y_sems)*1.96),
# color=(0.1,0.5,0.7,1),
color=np.append(sample_colors[si], 1),
fmt='.',
linewidth=0,
elinewidth=2,
ms=2,
)
# ax.axhline(y=np.nanmax(sample_y), xmin=xlim[0], xmax=xlim[1], dashes=(2,2), color=(0.9,0.5,0.1,1), linewidth=2,)
# ax.text(xlim[1]-0.4, float(np.nanmax(sample_y)) * 0.9, '\\textbf{{ {:.1f}\% }}'.format(100*float(np.nanmax(sample_y))), fontsize=16)
# ax.set_ylim(np.nanmin(sample_y)-0.01,np.nanmax(sample_y)+0.01)
ax.set_xlim(xlim)
else:
# pass
cluster_logps = loaded_maps[f'{method}_cluster_corrected_tstat'][task][f'N={sample_size}']
cluster_logps = cluster_logps[~np.isnan(cluster_logps).any(axis=1)]
positive_effects = np.multiply(np.mean((cluster_logps>logp_threshold) & (t_stats>0), 0), (ground_truth_effect[task]>0))
negative_effects = np.multiply(np.mean((cluster_logps>logp_threshold) & (t_stats<0), 0), (ground_truth_effect[task]<0))
scattery = positive_effects + negative_effects
# sns.scatterplot(
# x=scatterx,
# y=scattery,
# ax = ax,
# s=10,
# legend=False,
# color=(0.1,0.5,0.7,0.1),
# linewidth=0,
# )
effects = np.linspace(-1.5,1.5,200)
if si == 0:
# nocorr = [analysis.power(effect_size=x, nobs=40, alpha=0.05) for x in effects]
# boncorr = [analysis.power(effect_size=x, nobs=40, alpha=0.05/Nv) for x in effects]
nocorr = [analysis.power(effect_size=x, nobs=max(sample_sizes), alpha=0.05) for x in effects]
boncorr = [analysis.power(effect_size=x, nobs=min(sample_sizes), alpha=0.05/Nv) for x in effects]
sns.lineplot(
x=effects,
y=nocorr,
style=True,
dashes=[(2,2)],
color=(.3,.3,.3,1),
legend=False,
)
sns.lineplot(
x=effects,
y=boncorr,
style=True,
dashes=[(2,2)],
color=(.3,.3,.3,1),
legend=False,
)
# pass
# nocorr = [analysis.power(effect_size=x, nobs=sample_size, alpha=0.05) for x in effects]
# boncorr = [analysis.power(effect_size=x, nobs=sample_size, alpha=0.05/Nv) for x in effects]
# ax.fill_between(
# effects,
# boncorr,
# nocorr,
# # color = [0.1,0.5,0.7,0.3],
# color = np.append(sample_colors[si], 0.3),
# )
# ax.vlines(
# x=-0.5,
# ymin=analysis.power(effect_size=-0.5, nobs=40, alpha=0.05/Nv),
# ymax=analysis.power(effect_size=-0.5, nobs=40, alpha=0.05),
# linestyles='dashed',
# # dashes=[(2,2)],
# colors=[(0.9,0.5,0.1,1)],
# linewidth=2,
# )
# ax.vlines(
# x=0.5,
# ymin=analysis.power(effect_size=0.5, nobs=40, alpha=0.05/Nv),
# ymax=analysis.power(effect_size=0.5, nobs=40, alpha=0.05),
# linestyles='dashed',
# # dashes=[(2,2)],
# colors=[(0.9,0.5,0.1,1)],
# linewidth=2,
# )
# cubic spline fit
bins = np.linspace(scatterx.min(), scatterx.max(), 31)
digitized = np.digitize(scatterx, bins)
x_means = [scatterx[(digitized == i) | (digitized == i + 1)].mean() for i in range(1, len(bins) - 1)]
y_means = [scattery[(digitized == i) | (digitized == i + 1)].mean() for i in range(1, len(bins) - 1)]
y_sems = [stats.sem(scattery[(digitized == i) | (digitized == i + 1)]) for i in range(1, len(bins) - 1)]
cs = CubicSpline(x_means, y_means, bc_type='natural', extrapolate=False)
cs_sem = CubicSpline(x_means, y_sems, bc_type='natural', extrapolate=False)
sample_x = np.linspace(scatterx.min(),scatterx.max(),200)
sample_y = cs(sample_x)
# methodcorr = [float(cs(x)) for x in effects]
sample_y_sem = cs_sem(sample_x)
sns.lineplot(
x=sample_x,
y=sample_y,
style=True,
# dashes=[(2,2)],
# color=(0.01,0.05,0.07,1),
color=np.append(sample_colors[si], 1),
legend=False,
# label=f'N={sample_size}',
linewidth=3,
)
# ax.fill_between(
# sample_x,
# sample_y - (sample_y_sem*1.96),
# sample_y + (sample_y_sem*1.96),
# # color = [0.1,0.5,0.7,0.3],
# color = np.append(sample_colors[si], 0.3),
# )
# ax.text(-0.5 + 0.05, float(cs(-0.5)), '\\textbf{{ {:.0f}\% }}'.format(100*float(cs(-0.5))), fontsize=16)
# ax.text(0.5 + 0.05, float(cs(0.5)), '\\textbf{{ {:.0f}\% }}'.format(100*float(cs(0.5))), fontsize=16)
ax.set_xlim(-1.5,1.5)
ax.set_ylim(-0.05,1.05)
# if ri == 0:
# ax.set_title(formal_names[method], fontsize=20)
xlabel = ''
if ri == 2:
xlabel = 'Effect size ($d$)'
ax.set_xlabel(xlabel, fontsize=40)
ylabel = ''
if ci == 0:
ylabel = '{}'.format(task)
# if (ci == 2) & (ri == 2):
# if (ci == 1) & (ri == 0):
# ax.legend(loc='lower center', bbox_to_anchor=(0., 0.), ncol=len(sample_sizes))
# ax.set_ylabel(ylabel, fontsize=20)
ax.set_facecolor(np.array([234,234,242])/255)
ax.grid(color=(0.99,0.99,0.99,), linewidth=3)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.tick_params(axis='both', colors=(0.5,0.5,0.5), labelcolor=(0,0,0), direction='out')
fig.legend(
handles=[Patch(facecolor=np.append(sample_colors[si], 1),) for si, sample_size in enumerate(sample_sizes)],
labels=[f'N={sample_size}' for sample_size in sample_sizes],
loc='lower center',
bbox_to_anchor=(0.5, 1.0),
ncol=len(sample_sizes), fontsize=40,
# title="Sample size",
)
plt.show()